[AWS IoT Core] MQTT v5 を使用してリクエスト・レスポンス パターンを実装して見ました
1 はじめに
CX 事業本部のデリバリー部の平内(SIN)です。
AWS IoT Code で新規に対応となった MQTT v5 では、大規模なデバイス展開の通信強化や、メッセージングパターンを革新すると言われています。
正直なところ、MQTT v5 に触れたことがなく、なかなかイメージが湧かなかったので、今回は、勉強のため、メッセージングパターン拡張の1つとして挙げられる、「リクエスト・レスポンス」のパターンを実装してみました。
最初に、作成したサンプルが動作している様子です。 右側の縦長のコンソールがサーバで、左の 3 つが、3 個のクライアントをイメージしています。 後で確認していただくと、分かると思うのですが、サーバ側のプログラムは、特にクライアントを区別しないようにコーディングされているのですが、それぞれのクライアントは、自分の Subscribe したトピックに、送信時に設定したシーケンス番号(固有情報)と共に、受信できています。
なお、現時点(2022/11/30)では、AWS で提供される SDK は、MQTT v5 に対応していないため、サンプル作成には、paho.mqtt を使用させて頂きました。
参考:https://dev.classmethod.jp/articles/aws-iot-core-mqtt-v5-sdk-for-python
※ 本記事では、リクエスト・レスポンスの方向を表現するため、MQTT 通信を行う主体を「クライアント」「サーバ」のように表現していますが、MQTT では、単純に Subscriber 及び Publisher であることにご注意ください。
2 Request Response
HTTP 等を利用する Web アプリケーションでは、送信側と受信側が直接通信されるため、「リクエスト」に対する「レスポンス」という関係が常に成り立ちます。しかし、MQTT は、Publish/Subscribe で通信するため、特に作り込みが無い場合、このような関係に基づいた処理は実現できません。
MQTT v3.1 でも、必要な情報を追加することで、特定の「リクエスト」に基づく「レスポンス」を判別することは可能ですが、情報を付加できる場所は、Topic 若しくは、 Payload となり、本来の実装に少なからず影響することは避けられないでしょう。
MQTT v5 では、新規にプロパティ領域が追加され、リクエスト・レスポンスのパターンを支援するためのKeyがあります。これを使用する事で、スマートに実装が可能となっています。
3 新規追加プロパティ
MQTT v5 で新しく規定されたプロパティのうち、リクエスト・レスポンスパターンに関連するのは、以下の 4 つです。
値 | 名称 | 説明 | データ型 | コマンド |
---|---|---|---|---|
0x08 | Response Topic | Request-Response の返信 Topic | String | PUBLISH, Will |
0x09 | Correlation Data | Request-Response の相互共有情報 | Binary | PUBLISH, Will |
0x19 | Request Response Information | Request-Response 参照情報の要求 | Byte | CONNECT |
0x1A | Response Information | Request-Response 参照情報 | String | CONNACK |
※ 0x19 及び、0x1A については、今回実装しておりません。
(1) Response Topic
通常、クライアントは、特定の(ワイルドカードは使用しない)Topic を Subscribe して、サーバからのデータを待ち受けます。
プロパティResponse Topicを使用すれば、返信を希望する Topic をここに設定することで、待ち受けているトピックへの返信を要求することができます。
なお、Response Topicは、単なるプロパティ値であり、サーバ側で、この値を使用して Publish する実装は、開発者の作業となります。
リクエスト・レスポンスが構成できるということは、サーバ側で「クライアントに応じた Topic を出し分ける」というようなコストも、大きく削減されるかも知れません。
(2) Correlation Data
クライアント側でCorrelation Dataに何らかの識別情報等を付与して送信し、サーバ側で、これを折り返すことで、相互の共有情報として利用可能になります。
なお、Correlation Dataを適切に折り返す実装も、開発者の作業となります。
図では、送信データにシーケンス番号を付与し、クライアントで受け取ったレスポンスが、どの送信に基づくものであるかを判定しています。
4 実装例
以下が、実装したコードです。
server.py を 1 つ起動して、複数の client.py が実行可能ですが、server.py 側では、client 固有の処理は記述されていません。
client.py は、パラメータにクライアント ID を指定して、Subscribe する Topic 名に使用しています。
% python3 client.py clientId
server.py
import ssl import time import os import paho.mqtt.client as mqtt from paho.mqtt.properties import Properties from paho.mqtt.packettypes import PacketTypes import time endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir), "certfile": "{}/certificates/client-cert.pem".format(dir), "keyfile": "{}/certificates/private-key.pem".format(dir), } def on_publish(client, userdata, mid): print("on_publish") def on_connect(client, userdata, flags, reasonCode,properties=None): print("on_connect flags:{} properties:{} reasonCode: {}".format(flags, properties, reasonCode)) def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None): print("on_subscribe") def on_disconnect(client, userdata, rc,properties): print('on_disconnect {} {} {} {}'.format(client, userdata, rc, properties)) def on_unsubscribe(client, userdata, mid, properties, reasonCodes): print('on_unsubscribe') def on_message(client, userdata, message): msg=str(message.payload.decode("utf-8")) print('on_message topic:{} {} {}'.format(message.topic, msg, message.properties.CorrelationData)) properties = Properties(PacketTypes.PUBLISH) # プロパティに設定されたCorrelation Dataを折り返す properties.CorrelationData = message.properties.CorrelationData # プロパティに設定されたトピック名にレスポンスを返す response_topic = message.properties.ResponseTopic client.publish(response_topic, "response from server", properties = properties) def main(): client_id = "server" sub_topic = "sensor/server" client = mqtt.Client(client_id, protocol = mqtt.MQTTv5) client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect client.on_subscribe = on_subscribe client.on_publish = on_publish client.tls_set(certs["cafile"], certfile=certs["certfile"], keyfile=certs["keyfile"], cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) properties=None client.connect(endpoint, port, properties = properties) time.sleep(3) # 少しの待機が必要 client.subscribe(sub_topic, qos=0) client.loop_start() while(True): time.sleep(1) print("loop") if __name__ == "__main__": main()
client.py
import ssl import time import json import os import sys import paho.mqtt.client as mqtt from paho.mqtt.properties import Properties from paho.mqtt.packettypes import PacketTypes endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" port = 8883 dir = os.path.dirname(os.path.abspath(__file__)) certs = { "cafile": "{}/certificates/AmazonRootCA1.pem".format(dir), "certfile": "{}/certificates/client-cert.pem".format(dir), "keyfile": "{}/certificates/private-key.pem".format(dir), } def on_publish(client, userdata, mid): print("on_publish") def on_connect(client, userdata, flags, reasonCode,properties=None): print("on_connect flags:{} properties:{} reasonCode: {}".format(flags, properties, reasonCode)) def on_subscribe(mqttc, userdata, mid, granted_qos, properties=None): print("on_subscribe") def on_disconnect(client, userdata, rc,properties): print('on_disconnect {} {} {} {}'.format(client, userdata, rc, properties)) def on_unsubscribe(client, userdata, mid, properties, reasonCodes): print('on_unsubscribe') def on_message(client, userdata, message): msg=str(message.payload.decode("utf-8")) print('on_message topic:{} {} {}'.format(message.topic, msg, message.properties.CorrelationData)) def main(): args = sys.argv if 2 != len(args): print("use: client CLIEND_ID") exit() client_id = args[1] pub_topic = "sensor/server" sub_topic = "sensor/{}".format(client_id) client = mqtt.Client(client_id, protocol = mqtt.MQTTv5) client.on_connect = on_connect client.on_message = on_message client.on_disconnect = on_disconnect client.on_subscribe = on_subscribe client.on_publish = on_publish client.tls_set(certs["cafile"], certfile=certs["certfile"], keyfile=certs["keyfile"], cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None) client.connect(endpoint, port, properties = None) time.sleep(3) # 少しの待機が必要 client.subscribe(sub_topic, qos=0) client.loop_start() time.sleep(1) # プロパティに戻りのトピック名をセットする properties = Properties(PacketTypes.PUBLISH) properties.ResponseTopic = sub_topic for i in range(5): properties.CorrelationData = json.dumps({"seq":i}).encode('utf-8') client.publish(pub_topic, "message from {}".format(client_id), properties=properties) time.sleep(3) client.unsubscribe(sub_topic) client.disconnect() if __name__ == "__main__": main()
5 最後に
今回、MQTT v5 を使用して、「リクエスト・レスポンス」のパターンを実装してみました。
実装して見て分かったのは、MQTT v5 で新規に利用できるようになった機能は、あくまで、実装が可能になったと言う意味だと言う事です。
他に利用可能になった機能についても、順次確認を進めたいと思います。
6 参考にさせて頂いたリンク
MQTT Version 5.0 のリクエスト・レスポンスパターンを試してみた
Introducing new MQTTv5 features for AWS IoT Core to help build flexible architecture patterns
MQTT 5 supported features
[AWS IoT Core] MQTT v5 を使用してユーザープロパティを実装して見ました
[AWS IoT Core] MQTT v5 を使用してトピック・エイリアスを実装して見ました
[AWS IoT Core] MQTT v5 を使用してメッセージ及び、セッション有効期限とクリーンスタートを実装して見ました